[Hadoop] 分布式处理框架MapReduce 一

MapReduce概述、MapReduce计算场景、MapReduce编程模型(MapReduce编程模型之通过wordcount词频统计分析案例入门)、MapReduce Java API(MapReduce-Map、MapReduce-Reduce)、MapReduce执行步骤(整体 Example 图、Word Count Example 图、Map 数据输入、Reduce 数据输入、Word Count中的shuffle图、Shuffle、Shuffle Map端、Shuffle Reduce端、Combiner、Combiner Example图、核心概念)、MapReduce架构(1.X、2.X)

Posted by 李玉坤 on 2017-06-12

MapReduce概述

MapReduce是一个易于编写应用程序的软件框架,该应用程序以一种可靠的、容错的方式,在大型的商品硬件集群(数千个节点)上并行处理大量数据(多tb数据集)。

  • 源自于Google的MapReduce论文,论文发表于2004年22月
  • Hadoop MapReduce是Google MapReduce的克隆版
  • MapReduce优点:海量数量离线处理&易开发&易运行
    易开发:通过MapReduce固定框架来开发,开发者只需要关注业务逻辑即可,无需关心分布式结构。(其实与spark比较易开发&易运行这两点真是不敢恭维,这里只不过是官方给的一个说法而已)
  • MapReduce缺点:实时流式计算

MapReduce计算场景

  • 数据查找
    • 分布式Grep
  • Web访问日志分析
    • 词频统计
    • 网站PV(某个网页点击一次pv+1) UV(一万个用户点击就是uv一万次)统计
    • Top K问题
  • 倒排索引
    • 建立搜索引擎索引
  • 分布式排序

MapReduce编程模型

MapReduce编程模型之通过wordcount词频统计分析案例入门

wordcount: 统计文件中每个单词出现的次数

需求:求wc
hello.txt文件
 hadoop welcome
 hadoop hdfs mapreduce
 hadoop hdfs

  • 如何统计hello.txt文本中单词的出现次数?
    • Bash命令实现
      • tr -s “ “ “\n”
      • sort file
      • uniq –c
      • cat wordcount.txt | tr -s “ “ “\n” | sort | uniq -c
    • 单机版实现
      • 使用HashMap统计
    • 如果数据量极大如何在分布式的机器上计算?
  • MapReduce使用了分治思想简化了计算处理模型为两步:
    • Map阶段
      • 获得输入数据
      • 对输入数据进行转换并输出
    • Reduce阶段
      • 对输出结果进行聚合计算

1) 文件内容小:shell

1
cat hello.txt |sed 's/t[,.:;/!?]/ /g'|awk '(for(i=1;i<=NF;i++)array[$i]++;}END(for(i in array) print i, array [i]}'

执行上诉脚本即可得到结果

2)文件内容很大: TB GB ???? 如何解决大数据量的统计分析
借助于分布式计算框架来解决了: mapreduce


MapReduce任务通常将输入数据集分割成独立的块,由map任务以完全并行的方式处理。框架对映射的输出进行排序,然后将这些输出输入到reduce任务中。通常,作业的输入和输出都存储在文件系统中。框架负责调度任务,监视任务,并重新执行失败的任务。

根据上面的图进行理解,MapReduce框架只对<key,value>对进行操作,也就是说,框架将作业的输入视为一组<key,value对,并生成一组<key,value>对作为作业的输出,可以认为是不同类型的。

MapReduce Java API

  • 基于Hadoop 2.6.0版本
  • 新版本API均在包org.apache.hadoop.mapreduce下面
  • 编写MapReduce程序的核心
    • 继承Hadoop提供的Mapper类并实现其中的map方法
      public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
    • 继承Hadoop提供的Reducer类并实现其中的reduce方法
      public class Reducer<KEYIN,VALUEIN, KEYOUT,VALUEOUT>

MapReduce-Map

  • 对输入数据集的每个值都执行函数以创建新的结果集合
  • 例如:
    • 输入数据[1,2,3,4,5,6,7,8,9,10]
    • 定义Map方法需要执行的变换为f(x)=sin(x)
    • 则输出结果为[0.84,0.91,0.14,-0.76,-0.96,-0.28,0.66,0.99,0.41,-0.54]
  • 形式化的表达:
    • each <key, value> in input
    • map <key, value> to <intermediate key, intermediate value>
  • Map 映射 参数默认2个 但是实际不会根据默认走,这个和block块有很大的关系

MapReduce-Reduce

  • 对Map输出的结果进行聚合,输出一个或者多个聚合结果
  • 例如:
    • Map的输出结果为[0.84,0.91,0.14,-0.76,-0.96,-0.28,0.66,0.99,0.41,-0.54]
    • 使用求和+作为聚合方法
    • 则输出结果为[1.41]
  • 形式化的表达:
    • each <intermediate key, List> in input
    • reduce<reduce key, reduce value>
  • Reduce 聚合 参数默认1个
  • 相同的key会被划分到同一个Reduce上

MapReduce执行步骤

  • 准备map处理的输入数据
  • Mapper处理
  • Shuffle
  • Reduce处理
  • 结果输出

整体 Example 图

Word Count Example 图

Map 数据输入

  • Map阶段由一定的数量的Map Task组成
  • 文件分片
    • 输入数据会被split切分为多份
    • HDFS默认Block大小
      • Hadoop 1.0 = 64MB
      • Hadoop 2.0 = 128MB
    • 默认将文件解析为<key, value>对的实现是TextInputFormat
      • key为偏移量
      • value为每一行内容
  • 因此有多少个Map Task任务?
    • 一个split一个Map Task,默认情况下一个block对应一个split
    • 例如一个文件大小为10TB,block大小设置为128MB,则一共会有81920个Map Task任务(10 * 1024 * 1024 / 128 = 81920)

Reduce 数据输入

  • Partitioner决定了哪个Reduce会接收到Map输出的<key, value>对
  • 在Hadoop中默认的Partitioner实现为HashPartitioner
  • 计算公式
    • Abs(Hash(key)) mod NR 其中 NR等于Reduce Task数目
  • Partitioner可以自定义
  • 例如
    • 有3个Reduce Task
    • 那么Partitioner会返回0 ~ 2

Word Count中的shuffle图

Shuffle

  • 为何需要shuffle
    • Reduce阶段的数据来源于不同的Map
  • Shuffle由Map端和Reduce端组成
  • Shuffle的核心机制
    • 数据分区+排序
  • Map端
    • 对Map输出结果进行spill(溢写)
  • Reduce端
    • 拷贝Map端输出结果到本地
    • 对拷贝的数据进行归并排序

Shuffle Map端

  • Map端会源源不断的把数据输入到一个环形内存缓冲区
  • 达到一定阈值时
    • 新启动一个线程
    • 内存缓冲区中的数据会溢出到磁盘
  • 在溢出的过程中
    • 调用Partitioner进行分组
    • 对于每个组,按照Key进行排序
  • Map处理完毕后
    • 对溢出到磁盘上的多个文件进行Merge操作
    • 合并为一个大的文件和一个索引文件

Shuffle Reduce端

  • Map端完成之后会暴露一个Http Server共Reduce端获取数据
  • Reduce启动拷贝线程从各个Map端拷贝结果
    • 有大量的网络I/O开销
  • 一边拷贝一边进行Merge操作(归并排序)

Combiner

  • Map端本地Reducer
  • 合并了Map端输出数据 => 减少Http Traffic
  • Combiner可以自定义
  • 例如Word Count中
    • 对同一个Map输出的相同的key,直接对其value进行reduce
  • 可以使用Combiner的前提
    • 满足结合律:求最大值、求和
    • 不适用场景:计算平均数

Combiner Example图

核心概念

  • Split:交由MapReduce作业来处理的数据块,是MapReduce中最小的计算单元
    HDFS:blocksize 是HDFS中最小的存储单元 128M
    默认情况下:他们两是一一对应的,当然我们也可以手工设置他们之间的关系(不建议)
  • InputFormat:
    将我们的输入数据进行分片(split): InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
    TextInputFormat: 处理文本格式的数据
  • OutputFormat: 输出
  • Combiner(在后面代码部分)
  • Partitioner(在后面代码部分)

MapReduce架构

MapReduce1.X

MapReduce1.X架构

  1. JobTracker: JT
    作业的管理者 (管理的)
    将作业分解成一堆的任务:Task(MapTask和ReduceTask)
    将任务分派给TaskTracker运行
    作业的监控、容错处理(task作业挂了,重启task的机制)
    在一定的时间间隔内,JT没有收到TT的心跳信息,TT可能是挂了,TT上运行的任务会被指派到其他TT上去执行

  2. TaskTracker: TT
    任务的执行者 (干活的)
    在TT上执行我们的Task(MapTask和ReduceTask)
    会与JT进行交互:执行/启动/停止作业,发送心跳信息给JT

  3. MapTask
    自己开发的map任务交由该Task出来
    解析(比如空格或者tab键分割)每条记录的数据,交给自己的map方法处理
    将map的输出结果写到本地磁盘(有些作业只仅有map没有reduce==>HDFS)

  4. ReduceTask
    将Map Task输出的数据进行读取
    按照数据进行分组传给我们自己编写的reduce方法处理
    输出结果写到HDFS

MapReduce2.X

其实直接分为了yarn与mapreduce